AWSのフルマネージドサービスのみ使ってIoT向けビッグデータ基盤を構築する

AWSのフルマネージドサービスのみ使ってIoT向けビッグデータ基盤を構築する

Clock Icon2015.04.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

KinesisやRedshiftを使ったビッグデータ基盤の運用負荷を減らせないかと考えていたところ、Lambdaが正式公開されましたのでAWSのフルマネージドサービスだけで構築できないかやってみました。今回、構築したのはスマホアプリやIntel Edisonなどの端末からセンサー情報をKinesisに送信後、S3を経由してRedshiftにデータをロードするところまでになります。処理としては以下の図のような流れになります。Kinesisにデータを送信する箇所とデータ分析の部分は含まれていません。今回まだLambdaが東京リージョンでは使えないので、N.Virginiaリージョンで試しました。

fullmanaged-bigdata_6

Kinesisから取得したデータをCSVにしてS3へアップロードする

デバイスからKinesisに送信されたデータを取得してCSVファイルを作成、S3にアップロードしています。

準備する

Kinesisのストリームを作成
まず最初にKinesisのストリームを作成します。私はmystreamという名前にしました。
CSVをアップロードするバケットを作成
RedshiftにデータをロードするためにKinesisから取得したデータをCSVにしてS3にアップロードする必要があります。CSVを置いておくバケットを作成しましょう。私はmy-lambda-sampleという名前にしました。
IAMロールの作成
KinesisとS3が利用できるIAMロールを作成します。以下がPolicyになります。
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction"
      ],
      "Resource": [
        "*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:DescribeStream",
        "kinesis:ListStreams"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:*"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::*"
      ]
    }
  ]
}

Lambdaファンクションの実装

Kinesisからデータを取得してCSVに変換し、S3にアップロードするファンクションです。今回はCSVはソースコードを短く分かりやすくしたかったのでやっていませんが、CSVは圧縮した方がいいと思います。Management Consoleで実行するとS3のmy-lambda-sampleにtable1.csvというデータがアップロードされます。data checkというコメントの部分でデータのチェックやCSVへの変換処理を行う想定です。

var aws = require('aws-sdk');
var s3 = new aws.S3({apiVersion: '2006-03-01'});

exports.handler = function(event, context) {
	var data = '';
	event.Records.forEach(function(record) {
		payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
		// data check
		data += payload + '\n';
	});
    
	s3.putObject({
		Bucket:'my-lambda-sample',
		Key:'table1.csv',
		Body: data},
		function (err) {
		    if(err){
				context.succeed('error:' + err);    
			}else{
				context.succeed('success');
			}
		}
	);
};

Lambdaは以下のようにManagement Console上で編集することができます。ソースコードを張り付けて名前とRoleを設定後、保存したら完了です。

fullmanaged-bigdata_8

イベントの登録

どのような場合にLambdaファンクションが実行されるのか定義します。今回はKinesisに新しいデータが5件追加されたらイベントが発生することにします。Kinesis streamのプルダウンにはLambdaと同じリージョンのストリームしか表示されないので注意しましょう。Lambdaはまだ東京リージョンにないからLambdaだけ別のリージョンにする、というのはできません。

fullmanaged-bigdata_7

S3にアップロードしたCSVファイルをRedshiftにロードする

準備する

Redshiftのクラスターとテーブルの作成
最後にRedshiftの準備をします。クラスターを作成しテーブルを作成します。
CREATE TABLE public.table1 (
	column1 int,
	column2 VARCHAR(30)
);
IAMロールの作成
S3とRedshiftが利用できるIAMロールを作成します。
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:*"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "redshift:*"
      ],

      "Resource": "*"
    }
  ]
}

Lambdaファンクションの実装

以下がS3にアップロードしたCSVファイルをRedshiftにロードするLambdaファンクションになります。 ソースコードに直接APIキーを埋め込んでいるのが気になる方は回避する方法があるので以下の記事をご覧ください。 AWS LambdaからIAM RoleのCredential情報を取得し、RedshiftのCOPY処理に利用する

var pg = require('pg');

exports.handler = function(event, context) {
	var bucket = event.Records[0].s3.bucket.name;
	var key = event.Records[0].s3.object.key;
	console.log(bucket);
	console.log(key);

	var target = 'tcp://<接続ユーザー名>:<接続パスワード>@<接続時のエンドポイント>:<接続ポート番号>/<接続DB名>';
	var client = new pg.Client(target);

	client.connect();
	var queryString = "COPY public.table1 FROM 's3://" + bucket + "/" + key + "' ";
	queryString += "CREDENTIALS 'aws_access_key_id=<アクセスキー>;aws_secret_access_key=<シークレットアクセスキー>' ";
	queryString += 'CSV;';
	queryString += 'commit;';

	client.query(queryString,
		function (err) {
			if(err){
				context.succeed('error:' + err);
			}else{
				context.succeed('success');
			}
		}
	);
};

先ほどはManagement Consoleからやりましたが今度はpgというモジュールを使用しているので、ローカルで実装したものをzipにしてアップロードしています。npmコマンドを使ってモジュールをインストールする方法やモジュールをzipファイルに含めてアップロードする方法は以下の記事を参考にしました。デフォルトだと3秒でタイムアウトするので60秒に設定しています。

初めてのJavaScript、初めてのAWS Lambda

fullmanaged-bigdata_3

イベントの登録

どのような場合にLambdaファンクションが実行されるのか定義します。今回はS3のmy-lambda-sampleというバケットにファイルがアップロードされたらRedshiftにロードするCSVファイルと見なしてロードを開始しています。BucketのプルダウンにはLambdaと同じリージョンのBucketしか選択できないので注意しましょう。

fullmanaged-bigdata_4

動作を確認する

実装できたので確認してみましょう。確認にはAWS CLIを使います。以下のコマンドを実行するとKinesisにレコードが5件追加されますが、これによりRedshiftのtable1にデータが5件追加されていれば成功です。

実行したコマンドは以下の記事を参考にしました。 Kinesisの複数PUT対応APIを試してみた

$ echo '{
  "Records": [
    {
      "Data": "1,aaa",
      "PartitionKey": "sample1"
    },
    {
      "Data": "2,bbb",
      "PartitionKey": "sample2"
    },
    {
      "Data": "3,ccc",
      "PartitionKey": "sample3"
    },
    {
      "Data": "4,ddd",
      "PartitionKey": "sample4"
    },
    {
      "Data": "5,eee",
      "PartitionKey": "sample5"
    }
  ],
  "StreamName": "mystream"
}' > sample.json
$ aws kinesis put-records --cli-input-json file://sample.json

まとめ

私は今回Lambdaに初めて触ったので時間がかかりましたが、慣れればすばやく構築できるようになるのではないかと思います。Javascriptのイベントオブジェクトから追加されたファイルやレコードの情報が取得できるのがとても便利でした。EC2を立てずにフルマネージドサービスだけで運用できれば負荷を減らすことができそうですね。早く東京リージョンで使えるようになって欲しいです。今後、実際にこのような構成で運用する場合はどの程度の負荷まで耐えられるのかは分からないので、実際に運用する際には負荷試験をしっかりやってからにしたいと思います!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.